package spark.structed_streaming

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Duration, Seconds}


/**
  *
  * @author eureka.wh
  * @since 2019-06-05 18:17
  */
object Kafka2SparkStructed02 extends App {

  val spark = SparkSession
    .builder()
    .master("local[2]")
    .appName("Kafka2SparkStructed02")
    .getOrCreate()

  import spark.implicits._

  // 流式查询创建kafka源
  val df = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "cm01:9092,cm02:9092,cm03:9092")
    .option("subscribe","p2p")
    .load()

   // 流式查询创建kafka接收器
  val query = df.selectExpr("cast(key as string)","cast(value as string)")
     .as[(String,String)]
     .writeStream
     .outputMode("complete")
     .format("console")
    // .option("kafka.bootstrap.servers", "cm01:9092,cm02:9092,cm03:9092")
     .start()

   query.awaitTermination()

}
